package com.micdp;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;

/**
 * HDFS client
 * Created by hadoop on 2017/8/25/025.
 */
public class HdfsClient {

    private static final Logger logger = Logger.getLogger(HdfsClient.class);

    private Configuration conf = null;
    private FileSystem fs = null;

    public HdfsClient() {
        this("hdfs://hadoop1:8020");
    }
    public HdfsClient(String fsDefaultFS) {
        try {
            conf = new Configuration();
            conf.set("fs.defaultFS", fsDefaultFS);
            // set append is true.
            conf.setBoolean("dfs.support.append", true );
            conf.set("dfs.client.block.write.replace-datanode-on-failure.policy" , "NEVER" );
            conf.set("dfs.client.block.write.replace-datanode-on-failure.enable" , "true" );
            conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
            fs = FileSystem.get(conf);
        } catch (IOException e) {
            throw new RuntimeException("Failed to initialize FileSystem for hdfs,Please to config again." + e.getMessage());
        }
    }

    /**
     * create a directory
     * @param dir it must be a absolute path
     */
    public boolean mkdir(String dir) {
        boolean success = false;
        try {
            Path path = new Path(dir);
            if (!fs.exists(path)) {
                fs.mkdirs(path);
            }
            success = true;
        } catch (IllegalArgumentException e) {
            success = false;
            logger.error("The Param 'filePath' is illegal", e);
        } catch (IOException e) {
            success = false;
            logger.error("Failed to mkdir", e);
        }
        return success;
    }

    /**
     * create a file
     * @param src file path
     * @param overWrite whether over write is or not
     * @return whether to success to create
     */
    public boolean createFile(String src, boolean overWrite) {
        boolean success = false;
        try {
            Path path = new Path(src);
            if (overWrite) {
                fs.create(path, overWrite);
                success = true;
            } else {
                if (fs.isFile(path)) {
                    logger.info("Existed the Hdfs file: " + path.toString());
                    success = false;
                }
            }
        } catch (IllegalArgumentException e) {
            success = false;
            logger.error("The Param 'filePath' is illegal", e);
        } catch (IOException e) {
            success = false;
            logger.error("Failed to create a file", e);
        }
        return success;
    }
    /**
     * create a file
     * @param src file path
     * @return whether to success to create
     */
    public boolean createFile(String src) {
        return createFile(src, false);
    }

    /**
     * get a FSDataOutputStream
     * @param hdfsPath file path
     * @param overWrite whether to over write
     * @return FSDataOutputStream
     */
    public FSDataOutputStream getFSDataOutputStream(String hdfsPath, boolean overWrite) {
        FSDataOutputStream out = null;
        try {
            Path path = new Path(hdfsPath);
            if (overWrite) {
                out = fs.create(path, overWrite);
            } else {
                if (fs.exists(path) && fs.isFile(path)) {
                    out = fs.append(path);
                } else {
                    out = fs.create(path, overWrite);
                }
            }
        } catch (IllegalArgumentException e) {
            out = null;
            logger.error("The Param 'filePath' is illegal", e);
        } catch (IOException e) {
            out = null;
            logger.error("Failed to get FSDataOutputStream", e);
        }
        return out;
    }

    public boolean close(FSDataOutputStream out) {
        if(null != out) {
            try {
                out.close();
                return true;
            } catch (IOException e) {
                logger.error("Failed to close a FSDataOutputStream", e);
            }
            return false;
        }
        return false;
    }

    /**
     * create a new file, and write some bytes
     * @param src hdfs file
     * @param contents to write
     * @param overwrite whether to over write, if false, it will append
     */
    public boolean writeFile(String src, byte[] contents, boolean overwrite) {
        boolean success = false;
        FSDataOutputStream outputStream = null;
        try {
            outputStream = getFSDataOutputStream(src, overwrite);
            // write some bytes
            outputStream.write(contents);
            outputStream.flush();
            success = true;
        } catch (IOException e) {
            success = false;
            logger.info("Failed to write file[" + src + "]", e);
        } finally {
            if (!close(outputStream)) {
                success = false;
            }
        }
        return success;
    }

    /**
     * Recommend to use this method
     * create a new file, and write a string
     * @param hdfsPath hdsf file
     * @param contents to write
     * @param overwrite whether to over write, if false, it will append
     * @return
     */
    public boolean writeFile(String hdfsPath , String contents, boolean overwrite) {
        boolean success = false;
        FSDataOutputStream outputStream = null;
        try {
            outputStream = getFSDataOutputStream(hdfsPath, overwrite);
            //写入数据流
            outputStream.writeUTF(contents);
            outputStream.flush();
            success = false;
        } catch (IOException e) {
            logger.info("Failed to write file[" + hdfsPath + "]", e);
            success = false;
        } finally {
            if (!close(outputStream)) {
                success = false;
            }
        }
        return success;
    }

    /**
     * Recommend to use this method
     * appand some bytes to the specified hdfs file
     * @param src hsdf file
     * @param contents to write
     * @return
     */
    public boolean append(String src , byte[] contents) {
        return writeFile(src, contents, false);
    }

    /**
     * appand string to the specified hdfs file
     * @param src hdfs file
     * @param contents to write
     * @return
     */
    public boolean append(String src , String contents) {
        return writeFile(src, contents, false);
    }

    /**
     * 读取文件的内容
     * @param src hdsf file
     * @throws IOException
     */
    public void readFile(String src) {
        InputStream in = null;
        try {
            Path srcPath = new Path(src);
            in = fs.open(srcPath);
            // copy to System out stream
            IOUtils.copyBytes(in, System.out, 4096, false);
        } catch (IOException e) {
            logger.error("Failed to read hdfs file", e);
        } finally {
            IOUtils.closeStream(in);
        }
    }

    /**
     * copy from local file to hdfs file
     * @param delSrc whether to delete local file
     * @param overwrite whether to over write hdfs file
     * @param src local file
     * @param dst hdfs file
     * @return whether to success
     */
    public boolean uploadLocalFile(boolean delSrc, boolean overwrite, String src, String dst) {
        boolean success = false;
        try {
            Path srcPath = new Path(src);
            Path dstPath = new Path(dst);
            if (!fs.exists(srcPath)) {
                logger.info("There do not exist local path");
                success = false;
            } else {
                if (overwrite) {
                    fs.copyFromLocalFile(delSrc, overwrite, srcPath, dstPath);
                    success = true;
                } else {
                    if (fs.exists(dstPath)) {
                        logger.info("There existed hdfs file");
                        success = false;
                    } else {
                        fs.copyFromLocalFile(delSrc, srcPath, dstPath);
                        success = true;
                    }
                }
            }
        } catch (IllegalArgumentException e) {
            success = false;
            logger.error("The Param 'filePath' is illegal", e);
        } catch (IOException e) {
            success = false;
            logger.error("Failed to copy from local file to hdfs file", e);
        }
        return success;
    }

    /**
     * copy some local files to hdfs file
     * @param delSrc whether to delete srcs when to copy
     * @param overwrite whether to over write hdfs file
     * @param srcs local files
     * @param dst hdfs file
     * @return whether to success to copy
     */
    public boolean uploadLocalFile(boolean delSrc, boolean overwrite, String[] srcs, String dst) {
        Path[] srcPaths = convert2Paths(srcs);
        if(null == srcPaths) {
            return false;
        }
        boolean success = false;
        try {
            Path dstPath = new Path(dst);
            if (overwrite) {
                fs.copyFromLocalFile(delSrc, overwrite, srcPaths, dstPath);
                success = true;
            } else {
                if (fs.exists(dstPath)) {
                    logger.info("There existed hdfs file");
                    success = false;
                } else {
                    fs.copyFromLocalFile(delSrc, overwrite, srcPaths, dstPath);
                    success = true;
                }
            }
        } catch (IllegalArgumentException e) {
            success = false;
            logger.error("The Param 'dst' is illegal", e);
        } catch (IOException e) {
            success = false;
            logger.error("Failed to copy from local file to hdfs file", e);
        }
        return success;
    }

    public Path[] convert2Paths(String[] srcs) {
        if (null==srcs || 0==srcs.length) {
            return null;
        }
        try {
            List<Path> list = new ArrayList<Path>();
            for (String src : srcs) {
                list.add(new Path(src));
            }
            Path[] paths = new Path[]{};
            list.toArray(paths);
            return paths;
        } catch (IllegalArgumentException e) {
            logger.error("The Array 'srcs' included illegal file name", e);
            return null;
        }
    }

    /**
     * rename src to dst
     * @param src original file name
     * @param dst new file name
     * @return whether to rename is or not
     */
    public boolean rename(String src, String dst) {
        boolean success = false;
        try {
            Path srcPath = new Path(src);
            Path dstPath = new Path(dst);
            fs.rename(srcPath, dstPath);
            success = true;
        } catch (IllegalArgumentException e) {
            success = false;
            logger.error("The Param 'filePath' is illegal", e);
        } catch (IOException e) {
            success = false;
            logger.error("Failed to rename hdfs file", e);
        }
        return success;
    }

    /**
     * delete a hdfs file or directory
     * @param src hdfs file to delete
     * @param recursive whether is recursive or not
     * @return whether to delete
     */
    public boolean delete(String src, boolean recursive) {
        boolean success = false;
        try {
            Path srcPath = new Path(src);
            fs.delete(srcPath, recursive);
        } catch (IllegalArgumentException e) {
            success = false;
            logger.error("The Param 'src' is illegal", e);
        } catch (IOException e) {
            success = false;
            logger.error("Failed to delete hdfs file", e);
        }
        return success;
    }

    /**
     * delete a hdfs file
     * @param src hdfs file to delete
     * @return whether to delete
     */
    public boolean deleteFile(String src) {
        try {
            Path path = new Path(src);
            if (fs.isFile(path)) {
                return delete(src, false);
            } else {
                logger.info("The param 'src' is not regular file");
            }
        } catch (IllegalArgumentException e) {
            logger.error("The Param 'src' is illegal", e);
        } catch (IOException e) {
            logger.error("Failed to delete hdfs file", e);
        }
        return false;
    }

    /**
     * list hdfs file status
     * @param src hdfs file
     * @param recursive whether
     * @return
     */
    public FileStatus[] listStatus(String src, boolean recursive) {
        try {
            Path srcPath = new Path(src);
            if (!recursive) {
                return fs.listStatus(srcPath);
            } else {
                List<FileStatus> fileStatusList = new ArrayList<>();
                FileStatus[] fileStatuses = fs.listStatus(srcPath);
                if (null==fileStatuses || 0==fileStatuses.length) {
                    return new FileStatus[]{};
                } else {
                    for (FileStatus fileStatus : fileStatuses) {
                        if (fileStatus.isDirectory()) {
                            fileStatusList.add(fileStatus);
                            fileStatusList.addAll(Arrays.asList(listStatus(fileStatus.getPath().toString(), recursive)));
                        } else {
                            fileStatusList.add(fileStatus);
                        }
                    }
                    FileStatus[] result = fileStatusList.toArray(new FileStatus[fileStatusList.size()]);
                    return result;
                }
            }
        } catch (IllegalArgumentException e) {
            logger.error("The Param 'src' is illegal", e);
        } catch (IOException e) {
            logger.error("Failed to list status hdfs file", e);
        }
        return null;
    }

    /**
     * show hdfs file status
     * @param src hdfs file
     */
    public void showFileStatus(String src, boolean recursive) {
        FileStatus[] fileStatuses = listStatus(src, recursive);
        if (null == fileStatuses) {
            return ;
        }
        try {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            for (FileStatus fileStatus : fileStatuses) {
                String dir = fileStatus.isDirectory() ? "Directory" : "Regular File";
                String name = fileStatus.getPath().getName();
                String path = fileStatus.getPath().toString();
                System.out.println(name + "----" + dir + "  path:" + path);
                System.out.println("Access time: " + fileStatus.getAccessTime());
                System.out.println("Block size: " + fileStatus.getBlockSize());
                System.out.println("Group: " + fileStatus.getGroup());
                System.out.println("Length: " + fileStatus.getLen());
                System.out.println("Modified time: " + sdf.format(new Date(fileStatus.getModificationTime())));
                System.out.println("Owner: " + fileStatus.getOwner());
                System.out.println("Permission: " + fileStatus.getPermission());
                System.out.println("Replication: " + fileStatus.getReplication());
                if (fileStatus.isSymlink()) {
                    System.out.println("Symlink: " + fileStatus.getSymlink());
                }
            }
        } catch (IOException e) {
            logger.error("Failed to show file status", e);
        }
    }

    /**
     * get DataNode information
     * @return
     */
    public DatanodeInfo[] getAllDataNodesInfo() {
        try {
            DistributedFileSystem hdfs = (DistributedFileSystem) fs;
            return hdfs.getDataNodeStats();
        } catch (IOException e) {
            logger.error("Failed to get datanode information", e);
        }
        return null;
    }

    /**
     * show DataNode information
     */
    public void showAllDataNodesInfo() {
        DatanodeInfo[] dns = getAllDataNodesInfo();
        if (null == dns) {
            System.err.println("Not found any dataNode information.");
            return ;
        }
        for (int i=0; i<dns.length; i++) {
            System.out.println("datanode_" + i + "_name: " + dns[i].getHostName());
        }
    }

    /**
     * close client
     */
    public void close() {
        try {
            if (null != fs) {
                fs.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws UnsupportedEncodingException {
        HdfsClient client = new HdfsClient();
//        client.showAllDataNodesInfo();
//        client.createFile("/test/good/hello.txt");
//        client.deleteFile("/test/hello.txt");
//        client.delete("/test/good", true);
//        client.showFileStatus("/test", false);
//        System.out.println(client.mkdir("/test/good"));

        // write and read
//        client.deleteFile("/test/nice.txt");
        client.append("/test/nice.txt", "hello world, 中文".getBytes("utf-8"));
        client.readFile("/test/nice.txt");
        client.close();
    }

}
